'use strict'

const { expect } = require('chai')
const log = require('log').get('serverless:test')
const fixtures = require('../../fixtures/programmatic')

const {
  createKinesisStream,
  deleteKinesisStream,
  putKinesisRecord,
} = require('../../utils/kinesis')
const { putDynamoDbItem } = require('../../utils/dynamodb')
const { confirmCloudWatchLogs } = require('../../utils/misc')
const { deployService, removeService } = require('../../utils/integration')

describe('AWS - Stream Integration Test', function () {
  this.timeout(1000 * 60 * 100) // Involves time-taking deploys
  let stackName
  let serviceDir
  let streamName
  let tableName
  const historicStreamMessage = 'Hello from the Kinesis horizon!'
  const stage = 'dev'

  before(async () => {
    const serviceData = await fixtures.setup('stream')
    ;({ servicePath: serviceDir } = serviceData)
    const serviceName = serviceData.serviceConfig.service

    streamName = `${serviceName}-kinesis`
    tableName = `${serviceName}-table`
    stackName = `${serviceName}-${stage}`
    // create existing SQS queue
    // NOTE: deployment can only be done once the SQS queue is created
    log.notice(`Creating Kinesis stream "${streamName}"...`)
    return createKinesisStream(streamName)
      .then(() => putKinesisRecord(streamName, historicStreamMessage))
      .then(() => deployService(serviceDir))
  })

  after(async () => {
    await removeService(serviceDir)
    log.notice('Deleting Kinesis stream')
    return deleteKinesisStream(streamName)
  })

  describe('Kinesis Streams', () => {
    it('should invoke on kinesis messages from the trim horizon', async () => {
      const functionName = 'streamKinesis'
      const message = 'Hello from Kinesis!'

      return confirmCloudWatchLogs(
        `/aws/lambda/${stackName}-${functionName}`,
        () => putKinesisRecord(streamName, message),
        {
          checkIsComplete: (events) =>
            events
              .reduce((data, event) => data + event.message, '')
              .includes(message),
        },
      ).then((events) => {
        const logs = events.reduce((data, event) => data + event.message, '')
        expect(logs).to.include(functionName)
        expect(logs).to.include(message)
        expect(logs).to.include(historicStreamMessage)
      })
    })
  })

  describe('DynamoDB Streams', () => {
    it('should invoke on dynamodb messages from the latest position', async () => {
      const functionName = 'streamDynamoDb'
      const item = { id: String(Date.now()) }
      return confirmCloudWatchLogs(
        `/aws/lambda/${stackName}-${functionName}`,
        () => {
          item.hello = `from dynamo!${Math.random().toString(36).slice(2)}`
          return putDynamoDbItem(tableName, item)
        },
        {
          checkIsComplete: (events) =>
            events
              .reduce((data, event) => data + event.message, '')
              .includes(functionName),
        },
      ).then((events) => {
        const logs = events.reduce((data, event) => data + event.message, '')

        expect(logs).to.include(functionName)
        expect(logs).to.include('INSERT')
        expect(logs).to.include(item.id)
      })
    })
  })
})
